/* Copyright (C) 2014 InfiniDB, Inc.
   Copyright (C) 2016 MariaDB Corporation

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

// $Id: iomanager.cpp 2147 2013-08-14 20:44:44Z bwilkinson $
//
// C++ Implementation: iomanager
//
// Description:
//
//
// Author: Jason Rodriguez <jrodriguez@calpont.com>
//
//
//

#include "mcsconfig.h"

#define _FILE_OFFSET_BITS 64
#define _LARGEFILE64_SOURCE
#include <sys/mount.h>
#include <linux/fs.h>
#ifdef BLOCK_SIZE
#undef BLOCK_SIZE
#endif
#ifdef READ
#undef READ
#endif
#ifdef WRITE
#undef WRITE
#endif
#include <stdexcept>
#include <unistd.h>
#include <stdlib.h>
#include <string>
#include <sstream>
#include <tr1/unordered_map>
#include <tr1/unordered_set>
#include <set>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <fcntl.h>
#include <errno.h>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_array.hpp>
#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
#include <pthread.h>
//#define NDEBUG
#include <cassert>

using namespace std;

#include "configcpp.h"
using namespace config;

#include "messageobj.h"
#include "messageids.h"
using namespace logging;

#include "brmtypes.h"

#include "pp_logger.h"

#include "fsutils.h"

#include "rwlock_local.h"

#include "iomanager.h"
#include "liboamcpp.h"

#include "idbcompress.h"
using namespace compress;

#include "IDBDataFile.h"
#include "IDBPolicy.h"
#include "IDBLogger.h"
using namespace idbdatafile;

#include "mcsconfig.h"
#include "threadnaming.h"

typedef tr1::unordered_set<BRM::OID_t> USOID;

namespace primitiveprocessor
{
extern Logger* mlp;
extern int directIOFlag;
extern int noVB;
}  // namespace primitiveprocessor

#ifndef O_BINARY
#define O_BINARY 0
#endif
#ifndef O_LARGEFILE
#define O_LARGEFILE 0
#endif
#ifndef O_NOATIME
#define O_NOATIME 0
#endif

namespace
{
using namespace dbbc;
using namespace std;

const std::string boldStart = "\033[0;1m";
const std::string boldStop = "\033[0;39m";

const uint32_t MAX_OPEN_FILES = 16384;
const uint32_t DECREASE_OPEN_FILES = 4096;

void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, double& tm)
{
  tm = (double)(tv2.tv_sec - tv1.tv_sec) + 1.e-9 * (tv2.tv_nsec - tv1.tv_nsec);
}

struct IOMThreadArg
{
  ioManager* iom;
  int32_t thdId;
};

typedef IOMThreadArg IOMThreadArg_t;

/* structures shared across all iomanagers */
class FdEntry
{
 public:
  FdEntry() : oid(0), dbroot(0), partNum(0), segNum(0), fp(0), c(0), inUse(0), compType(0)
  {
    cmpMTime = 0;
  }

  FdEntry(const BRM::OID_t o, const uint16_t d, const uint32_t p, const uint16_t s, const int ct,
          IDBDataFile* f)
   : oid(o), dbroot(d), partNum(p), segNum(s), fp(f), c(0), inUse(0), compType(0)
  {
    cmpMTime = 0;

    if (oid >= 1000)
      compType = ct;
  }

  ~FdEntry()
  {
    delete fp;
    fp = 0;
  }

  BRM::OID_t oid;
  uint16_t dbroot;
  uint32_t partNum;
  uint16_t segNum;
  IDBDataFile* fp;
  uint32_t c;
  int inUse;

  CompChunkPtrList ptrList;

  int compType;
  bool isCompressed() const
  {
    return (oid >= 1000 && compType != 0);
  }
  time_t cmpMTime;
  friend ostream& operator<<(ostream& out, const FdEntry& o)
  {
    out << " o: " << o.oid << " f: " << o.fp << " d: " << o.dbroot << " p: " << o.partNum
        << " s: " << o.segNum << " c: " << o.c << " t: " << o.compType << " m: " << o.cmpMTime;
    return out;
  }
};

struct fdCacheMapLessThan
{
  bool operator()(const FdEntry& lhs, const FdEntry& rhs) const
  {
    if (lhs.oid < rhs.oid)
      return true;

    if (lhs.oid == rhs.oid && lhs.dbroot < rhs.dbroot)
      return true;

    if (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum < rhs.partNum)
      return true;

    if (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum == rhs.partNum &&
        lhs.segNum < rhs.segNum)
      return true;

    return false;
  }
};

struct fdMapEqual
{
  bool operator()(const FdEntry& lhs, const FdEntry& rhs) const
  {
    return (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum == rhs.partNum &&
            lhs.segNum == rhs.segNum);
  }
};

typedef boost::shared_ptr<FdEntry> SPFdEntry_t;
typedef std::map<FdEntry, SPFdEntry_t, fdCacheMapLessThan> FdCacheType_t;

struct FdCountEntry
{
  FdCountEntry()
  {
  }
  FdCountEntry(const BRM::OID_t o, const uint16_t d, const uint32_t p, const uint16_t s, const uint32_t c,
               const FdCacheType_t::iterator it)
   : oid(o), dbroot(d), partNum(p), segNum(s), cnt(c), fdit(it)
  {
  }
  ~FdCountEntry()
  {
  }

  BRM::OID_t oid;
  uint16_t dbroot;
  uint32_t partNum;
  uint16_t segNum;
  uint32_t cnt;
  FdCacheType_t::iterator fdit;
};  // FdCountEntry

typedef FdCountEntry FdCountEntry_t;

struct fdCountCompare
{
  bool operator()(const FdCountEntry_t& lhs, const FdCountEntry_t& rhs) const
  {
    return lhs.cnt > rhs.cnt;
  }
};

typedef multiset<FdCountEntry_t, fdCountCompare> FdCacheCountType_t;

FdCacheType_t fdcache;
boost::mutex fdMapMutex;
rwlock::RWLock_local localLock;

char* alignTo(const char* in, int av)
{
  ptrdiff_t inx = reinterpret_cast<ptrdiff_t>(in);
  ptrdiff_t avx = static_cast<ptrdiff_t>(av);

  if ((inx % avx) != 0)
  {
    inx &= ~(avx - 1);
    inx += avx;
  }

  char* outx = reinterpret_cast<char*>(inx);
  return outx;
}

void waitForRetry(long count)
{
  usleep(5000 * count);
  return;
}

// Must hold the FD cache lock!
static int updateptrs(char* ptr, FdCacheType_t::iterator fdit)
{
  ssize_t i;
  uint32_t progress;

  // ptr is taken from buffer, already been checked: realbuff.get() == 0
  if (ptr == 0)
    return -1;

  // already checked before: fdit->second->isCompressed()
  if (fdit->second.get() == 0)
    return -2;

  IDBDataFile* fp = fdit->second->fp;

  if (fp == INVALID_HANDLE_VALUE)
  {
    Message::Args args;
    args.add("updateptrs got invalid fp.");
    primitiveprocessor::mlp->logInfoMessage(logging::M0006, args);
    return -3;
  }

  // We need to read one extra block because we need the first ptr in the 3rd block
  // to know if we're done.
  // FIXME: re-work all of this so we don't have to re-read the 3rd block.
  progress = 0;

  while (progress < 4096 * 3)
  {
    i = fp->pread(&ptr[progress], progress, (4096 * 3) - progress);

    if (i <= 0)
      break;

    progress += i;
  }

  if (progress != 4096 * 3)
    return -4;  // let it retry. Not likely, but ...

  fdit->second->cmpMTime = 0;
  time_t mtime = fp->mtime();

  if (mtime != (time_t)-1)
    fdit->second->cmpMTime = mtime;

  int gplRc = 0;
  gplRc = compress::CompressInterface::getPtrList(&ptr[4096], 4096, fdit->second->ptrList);

  if (gplRc != 0)
    return -5;  // go for a retry.

  if (fdit->second->ptrList.size() == 0)
    return -6;  // go for a retry.

  uint64_t numHdrs = fdit->second->ptrList[0].first / 4096ULL - 2ULL;

  if (numHdrs > 0)
  {
    boost::scoped_array<char> nextHdrBufsa(new char[numHdrs * 4096 + 4095]);
    char* nextHdrBufPtr = 0;

    nextHdrBufPtr = alignTo(nextHdrBufsa.get(), 4096);

    progress = 0;

    while (progress < numHdrs * 4096)
    {
      i = fp->pread(&nextHdrBufPtr[progress], (4096 * 2) + progress, (numHdrs * 4096) - progress);

      if (i <= 0)
        break;

      progress += i;
    }

    if (progress != numHdrs * 4096)
      return -8;

    CompChunkPtrList nextPtrList;
    gplRc = compress::CompressInterface::getPtrList(&nextHdrBufPtr[0], numHdrs * 4096, nextPtrList);

    if (gplRc != 0)
      return -7;  // go for a retry.

    fdit->second->ptrList.insert(fdit->second->ptrList.end(), nextPtrList.begin(), nextPtrList.end());
  }

  return 0;
}

void* thr_popper(ioManager* arg)
{
  utils::setThreadName("thr_popper");
  ioManager* iom = arg;
  FileBufferMgr* fbm;
  fileRequest* fr = 0;
  BRM::LBID_t lbid = 0;
  BRM::OID_t oid = 0;
  BRM::VER_t ver = 0;
  BRM::QueryContext qc;
  BRM::VER_t txn = 0;
  int compType = 0;
  int blocksLoaded = 0;
  int blocksRead = 0;
  const unsigned pageSize = 4096;
  fbm = &iom->fileBufferManager();
  char fileName[WriteEngine::FILE_NAME_SIZE];
  const uint64_t fileBlockSize = BLOCK_SIZE;
  bool flg = false;
  bool useCache;
  uint16_t dbroot = 0;
  uint32_t partNum = 0;
  uint16_t segNum = 0;
  uint32_t offset = 0;
  char* fileNamePtr = fileName;
  uint64_t longSeekOffset = 0;
  int err;
  uint32_t dlen = 0, acc, readSize, blocksThisRead, j;
  uint32_t blocksRequested = 0;
  ssize_t i;
  char* alignedbuff = 0;
  boost::scoped_array<char> realbuff;
  pthread_t threadId = 0;
  ostringstream iomLogFileName;
  ofstream lFile;
  struct timespec rqst1;
  struct timespec rqst2;
  struct timespec tm;
  struct timespec tm2;
  double tm3;
  double rqst3;
  bool locked = false;
  SPFdEntry_t fe;
  vector<CacheInsert_t> cacheInsertOps;
  bool copyLocked = false;

  if (iom->IOTrace())
  {
    threadId = pthread_self();
    iomLogFileName << MCSLOGDIR << "/trace/iom." << threadId;
    lFile.open(iomLogFileName.str().c_str(), ios_base::app | ios_base::ate);
  }

  FdCacheType_t::iterator fdit;
  IDBDataFile* fp = 0;
  size_t maxCompSz =
      compress::CompressInterface::getMaxCompressedSizeGeneric(iom->blocksPerRead * BLOCK_SIZE);
  size_t readBufferSz = maxCompSz + pageSize;

  realbuff.reset(new char[readBufferSz]);

  if (realbuff.get() == 0)
  {
    cerr << "thr_popper: Can't allocate space for a whole extent in memory" << endl;
    return 0;
  }

  alignedbuff = alignTo(realbuff.get(), 4096);

  if ((((ptrdiff_t)alignedbuff - (ptrdiff_t)realbuff.get()) >= (ptrdiff_t)pageSize) ||
      (((ptrdiff_t)alignedbuff % pageSize) != 0))
    throw runtime_error("aligned buffer size is not matching the page size.");

  uint8_t* uCmpBuf = 0;
  uCmpBuf = new uint8_t[4 * 1024 * 1024 + 4];

  for (;;)
  {
    if (copyLocked)
    {
      iom->dbrm()->releaseLBIDRange(lbid, blocksRequested);
      copyLocked = false;
    }

    if (locked)
    {
      localLock.read_unlock();
      locked = false;
    }

    fr = iom->getNextRequest();

    localLock.read_lock();
    locked = true;

    if (iom->IOTrace())
      clock_gettime(CLOCK_REALTIME, &rqst1);

    lbid = fr->Lbid();
    qc = fr->Ver();
    txn = fr->Txn();
    flg = fr->Flg();
    compType = fr->CompType();
    useCache = fr->useCache();
    blocksLoaded = 0;
    blocksRead = 0;
    dlen = fr->BlocksRequested();
    blocksRequested = fr->BlocksRequested();
    oid = 0;
    dbroot = 0;
    partNum = 0;
    segNum = 0;
    offset = 0;

    // special case for getBlock.
    iom->dbrm()->lockLBIDRange(lbid, blocksRequested);
    copyLocked = true;

    // special case for getBlock.
    if (blocksRequested == 1)
    {
      BRM::VER_t outVer;
      iom->dbrm()->vssLookup((BRM::LBID_t)lbid, qc, txn, &outVer, &flg);
      ver = outVer;
      fr->versioned(flg);
    }
    else
    {
      fr->versioned(false);
      ver = qc.currentScn;
    }

    err = iom->localLbidLookup(lbid, ver, flg, oid, dbroot, partNum, segNum, offset);

    if (err == BRM::ERR_SNAPSHOT_TOO_OLD)
    {
      ostringstream errMsg;
      errMsg << "thr_popper: version " << ver << " of LBID " << lbid << "is too old";
      iom->handleBlockReadError(fr, errMsg.str(), &copyLocked);
      continue;
    }
    else if (err < 0)
    {
      ostringstream errMsg;
      errMsg << "thr_popper: BRM lookup failure; lbid=" << lbid << "; ver=" << ver
             << "; flg=" << (flg ? 1 : 0);
      iom->handleBlockReadError(fr, errMsg.str(), &copyLocked, fileRequest::BRM_LOOKUP_ERROR);
      continue;
    }

#ifdef IDB_COMP_POC_DEBUG
    {
      boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);

      if (compType != 0)
        cout << boldStart;

      cout << "fileRequest: " << *fr << endl;

      if (compType != 0)
        cout << boldStop;
    }
#endif
    const uint32_t extentSize = iom->getExtentRows();
    FdEntry fdKey(oid, dbroot, partNum, segNum, compType, NULL);
    // cout << "Looking for " << fdKey << endl
    //   << "O: " << oid << " D: " << dbroot << " P: " << partNum << " S: " << segNum << endl;

    fdMapMutex.lock();
    fdit = fdcache.find(fdKey);

    if (fdit == fdcache.end())
    {
      try
      {
        iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
      }
      catch (exception& exc)
      {
        fdMapMutex.unlock();
        Message::Args args;
        args.add(oid);
        args.add(exc.what());
        primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
        ostringstream errMsg;
        errMsg << "thr_popper: Error building filename for OID " << oid << "; " << exc.what();
        iom->handleBlockReadError(fr, errMsg.str(), &copyLocked);
        continue;
      }

#ifdef IDB_COMP_USE_CMP_SUFFIX

      if (compType != 0)
      {
        char* ptr = strrchr(fileNamePtr, '.');
        idbassert(ptr);
        strcpy(ptr, ".cmp");
      }

#endif

      if (oid > 3000)
      {
        // TODO: should syscat columns be considered when reducing open file count
        //  They are always needed why should they be closed?
        if (fdcache.size() >= iom->MaxOpenFiles())
        {
          FdCacheCountType_t fdCountSort;

          for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
          {
            struct FdCountEntry fdc(it->second->oid, it->second->dbroot, it->second->partNum,
                                    it->second->segNum, it->second->c, it);

            fdCountSort.insert(fdc);
          }

          if (iom->FDCacheTrace())
          {
            iom->FDTraceFile() << "Before flushing sz: " << fdcache.size()
                               << " delCount: " << iom->DecreaseOpenFilesCount() << endl;

            for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
              iom->FDTraceFile() << *(*it).second << endl;

            iom->FDTraceFile() << "==================" << endl << endl;
          }

          // TODO: should we consider a minimum number of open files
          //      currently, there is nothing to prevent all open files
          //      from being closed by the IOManager.

          uint32_t delCount = 0;

          for (FdCacheCountType_t::reverse_iterator rit = fdCountSort.rbegin();
               rit != fdCountSort.rend() && fdcache.size() > 0 && delCount < iom->DecreaseOpenFilesCount();
               rit++)
          {
            FdEntry oldfdKey(rit->oid, rit->dbroot, rit->partNum, rit->segNum, 0, NULL);
            FdCacheType_t::iterator it = fdcache.find(oldfdKey);

            if (it != fdcache.end())
            {
              if (iom->FDCacheTrace())
              {
                if (!rit->fdit->second->inUse)
                  iom->FDTraceFile() << "Removing dc: " << delCount << " sz: " << fdcache.size()
                                     << *(*it).second << " u: " << rit->fdit->second->inUse << endl;
                else
                  iom->FDTraceFile() << "Skip Remove in use dc: " << delCount << " sz: " << fdcache.size()
                                     << *(*it).second << " u: " << rit->fdit->second->inUse << endl;
              }

              if (rit->fdit->second->inUse <= 0)
              {
                fdcache.erase(it);
                delCount++;
              }
            }
          }  // for (FdCacheCountType_t...

          if (iom->FDCacheTrace())
          {
            iom->FDTraceFile() << "After flushing sz: " << fdcache.size() << endl;

            for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
            {
              iom->FDTraceFile() << *(*it).second << endl;
            }

            iom->FDTraceFile() << "==================" << endl << endl;
          }

          fdCountSort.clear();

        }  // if (fdcache.size()...
      }    // if (oid > 3000)

      int opts = primitiveprocessor::directIOFlag ? IDBDataFile::USE_ODIRECT : 0;
      fp = NULL;
      uint32_t openRetries = 0;
      int saveErrno = 0;

      while (fp == NULL && openRetries++ < 5)
      {
        fp = IDBDataFile::open(IDBPolicy::getType(fileNamePtr, IDBPolicy::PRIMPROC), fileNamePtr, "r", opts);
        saveErrno = errno;

        if (fp == NULL)
          sleep(1);
      }

      if (fp == NULL)
      {
        Message::Args args;
        fdit = fdcache.end();
        fdMapMutex.unlock();
        args.add(oid);
        args.add(string(fileNamePtr) + ":" + strerror(saveErrno));
        primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
        ostringstream errMsg;
        errMsg << "thr_popper: Error opening file for OID " << oid << "; " << fileNamePtr << "; "
               << strerror(saveErrno);
        int errorCode = fileRequest::FAILED;

        if (saveErrno == EINVAL)
          errorCode = fileRequest::FS_EINVAL;
        else if (saveErrno == ENOENT)
          errorCode = fileRequest::FS_ENOENT;

        iom->handleBlockReadError(fr, errMsg.str(), &copyLocked, errorCode);
        continue;
      }

      fe.reset(new FdEntry(oid, dbroot, partNum, segNum, compType, fp));
      fe->inUse++;
      fdcache[fdKey] = fe;
      fdit = fdcache.find(fdKey);
      fe.reset();
    }

    else
    {
      if (fdit->second.get())
      {
        fdit->second->c++;
        fdit->second->inUse++;
        fp = fdit->second->fp;
      }
      else
      {
        Message::Args args;
        fdit = fdcache.end();
        fdMapMutex.unlock();
        args.add(oid);
        ostringstream errMsg;
        errMsg << "Null FD cache entry. (dbroot, partNum, segNum, compType) = (" << dbroot << ", " << partNum
               << ", " << segNum << ", " << compType << ")";
        args.add(errMsg.str());
        primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
        iom->handleBlockReadError(fr, errMsg.str(), &copyLocked);
        continue;
      }
    }

    fdMapMutex.unlock();

#ifdef SHARED_NOTHING_DEMO_2

    // change offset if it's shared nothing
    /* Get the extent #, divide by # of PMs, calculate base offset for the new extent #,
            add extent offset */
    if (oid >= 10000)
      offset = (((offset / extentSize) / iom->pmCount) * extentSize) + (offset % extentSize);

#endif

    longSeekOffset = (uint64_t)offset * (uint64_t)fileBlockSize;
    lldiv_t cmpOffFact = lldiv(longSeekOffset, (4LL * 1024LL * 1024LL));

    uint32_t readCount = 0;
    uint32_t bytesRead = 0;
    uint32_t compressedBytesRead =
        0;  // @Bug 3149.  IOMTrace was not reporting bytesRead correctly for compressed columns.
    uint32_t jend = blocksRequested / iom->blocksPerRead;

    if (iom->IOTrace())
      clock_gettime(CLOCK_REALTIME, &tm);

    ostringstream errMsg;
    bool errorOccurred = false;
    string errorString;
#ifdef IDB_COMP_POC_DEBUG
    bool debugWrite = false;
#endif

#ifdef EM_AS_A_TABLE_POC__
    dlen = 1;
#endif

    if (blocksRequested % iom->blocksPerRead)
      jend++;

    for (j = 0; j < jend; j++)
    {
      int decompRetryCount = 0;
      int retryReadHeadersCount = 0;

    decompRetry:
      blocksThisRead = std::min(dlen, iom->blocksPerRead);
      readSize = blocksThisRead * BLOCK_SIZE;

      acc = 0;

      while (acc < readSize)
      {
#if defined(EM_AS_A_TABLE_POC__)

        if (oid == 1084)
        {
          uint32_t h;
          int32_t o = 0;
          int32_t* ip;
          ip = (int32_t*)(&alignedbuff[acc]);

          for (o = 0; o < 2048; o++)
          {
            if (iom->dbrm()->getHWM(o + 3000, h) == 0)
              *ip++ = h;
            else
              *ip++ = numeric_limits<int32_t>::min() + 1;
          }

          i = BLOCK_SIZE;
        }
        else
          i = pread(fd, &alignedbuff[acc], readSize - acc, longSeekOffset);

#else

        if (fdit->second->isCompressed())
        {
        retryReadHeaders:
          // hdrs may have been modified since we cached them in fdit->second...
          time_t cur_mtime = numeric_limits<time_t>::max();
          int updatePtrsRc = 0;
          fdMapMutex.lock();
          time_t fp_mtime = fp->mtime();

          if (fp_mtime != (time_t)-1)
            cur_mtime = fp_mtime;

          if (decompRetryCount > 0 || retryReadHeadersCount > 0 || cur_mtime > fdit->second->cmpMTime)
            updatePtrsRc = updateptrs(&alignedbuff[0], fdit);

          fdMapMutex.unlock();

          int idx = cmpOffFact.quot;

          if (updatePtrsRc != 0 || idx >= (signed)fdit->second->ptrList.size())
          {
            // Due to race condition, the header on disk may not upated yet.
            // Log an info message and retry.
            if (retryReadHeadersCount == 0)
            {
              Message::Args args;
              args.add(oid);
              ostringstream infoMsg;
              iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
              infoMsg << "retry updateptrs for " << fileNamePtr << ". rc=" << updatePtrsRc << ", idx=" << idx
                      << ", ptr.size=" << fdit->second->ptrList.size();
              args.add(infoMsg.str());
              primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
            }

            if (++retryReadHeadersCount < 30)
            {
              waitForRetry(retryReadHeadersCount);
              fdit->second->cmpMTime = 0;
              goto retryReadHeaders;
            }
            else
            {
              // still fail after all the retries.
              errorOccurred = true;
              errMsg << "Error reading compression header. rc=" << updatePtrsRc << ", idx=" << idx
                     << ", ptr.size=" << fdit->second->ptrList.size();
              errorString = errMsg.str();
              break;
            }
          }

          // FIXME: make sure alignedbuff can hold fdit->second->ptrList[idx].second bytes
          if (fdit->second->ptrList[idx].second > maxCompSz)
          {
            errorOccurred = true;
            errMsg << "aligned buff too small. dataSize=" << fdit->second->ptrList[idx].second
                   << ", buffSize=" << maxCompSz;
            errorString = errMsg.str();
            break;
          }

          i = fp->pread(&alignedbuff[0], fdit->second->ptrList[idx].first, fdit->second->ptrList[idx].second);
#ifdef IDB_COMP_POC_DEBUG
          {
            boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
            cout << boldStart << "pread1.1(" << fp << ", 0x" << hex << (ptrdiff_t)&alignedbuff[0] << dec
                 << ", " << fdit->second->ptrList[idx].second << ", " << fdit->second->ptrList[idx].first
                 << ") = " << i << ' ' << cmpOffFact.quot << ' ' << cmpOffFact.rem << boldStop << endl;
          }
#endif

          // @bug3407, give it some retries if pread failed.
          if (i != (ssize_t)fdit->second->ptrList[idx].second)
          {
            // log an info message
            if (retryReadHeadersCount == 0)
            {
              Message::Args args;
              args.add(oid);
              ostringstream infoMsg;
              iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
              infoMsg << " Read from " << fileNamePtr << " failed at chunk " << idx
                      << ". (offset, size, actuall read) = (" << fdit->second->ptrList[idx].first << ", "
                      << fdit->second->ptrList[idx].second << ", " << i << ")";
              args.add(infoMsg.str());
              primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
            }

            if (++retryReadHeadersCount < 30)
            {
              waitForRetry(retryReadHeadersCount);
              fdit->second->cmpMTime = 0;
              goto retryReadHeaders;
            }
            else
            {
              errorOccurred = true;
              errMsg << "Error reading chunk " << idx;
              errorString = errMsg.str();
              break;
            }
          }

          compressedBytesRead += i;  // @Bug 3149.
          i = readSize;
        }
        else
        {
          i = fp->pread(&alignedbuff[acc], longSeekOffset, readSize - acc);
#ifdef IDB_COMP_POC_DEBUG
          {
            boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
            cout << "pread1.2(" << fp << ", 0x" << hex << (ptrdiff_t)&alignedbuff[acc] << dec << ", "
                 << (readSize - acc) << ", " << longSeekOffset << ") = " << i << ' ' << cmpOffFact.quot << ' '
                 << cmpOffFact.rem << endl;
          }
#endif
        }

#endif

        if (i < 0 && errno == EINTR)
        {
          continue;
        }
        else if (i < 0)
        {
          try
          {
            iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
          }
          catch (exception& exc)
          {
            cerr << "FileName Err:" << exc.what() << endl;
            strcpy(fileNamePtr, "unknown filename");
          }

          errorString = strerror(errno);
          errorOccurred = true;
          errMsg << "thr_popper: Error reading file for OID " << oid << "; "
                 << " fp " << fp << "; offset " << longSeekOffset << "; fileName " << fileNamePtr << "; "
                 << errorString;
          break;  // break from "while(acc..." loop
        }
        else if (i == 0)
        {
          iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
          errorString = "early EOF";
          errorOccurred = true;
          errMsg << "thr_popper: Early EOF reading file for OID " << oid << "; " << fileNamePtr;
          break;  // break from "while(acc..." loop
        }

        acc += i;
        longSeekOffset += (uint64_t)i;
        readCount++;
        bytesRead += i;
      }  // while(acc...

      //..Break out of "for (j..." read loop if error occurred
      if (errorOccurred)
      {
        Message::Args args;
        args.add(oid);
        args.add(errorString);
        primitiveprocessor::mlp->logMessage(logging::M0061, args, true);
        iom->handleBlockReadError(fr, errMsg.str(), &copyLocked);
        break;
      }

      blocksRead += blocksThisRead;

      if (iom->IOTrace())
        clock_gettime(CLOCK_REALTIME, &tm2);

      /* New bulk VSS lookup code */
      {
        vector<BRM::LBID_t> lbids;
        vector<BRM::VER_t> versions;
        vector<bool> isLocked;

        for (i = 0; (uint32_t)i < blocksThisRead; i++)
          lbids.push_back((BRM::LBID_t)(lbid + i) + (j * iom->blocksPerRead));

        if (blocksRequested > 1 || !flg)  // prefetch, or an unversioned single-block read
          iom->dbrm()->bulkGetCurrentVersion(lbids, &versions, &isLocked);
        else  // a single-block read that was versioned
        {
          versions.push_back(ver);
          isLocked.push_back(false);
        }

        uint8_t* ptr = (uint8_t*)&alignedbuff[0];

        if (blocksThisRead > 0 && fdit->second->isCompressed())
        {
          size_t blen = 4 * 1024 * 1024 + 4;
#ifdef IDB_COMP_POC_DEBUG
          {
            boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
            cout << "decompress(0x" << hex << (ptrdiff_t)&alignedbuff[0] << dec << ", "
                 << fdit->second->ptrList[cmpOffFact.quot].second << ", 0x" << hex << (ptrdiff_t)uCmpBuf
                 << dec << ", " << blen << ")" << endl;
          }
#endif

          std::unique_ptr<compress::CompressInterface> decompressor(
              compress::getCompressInterfaceByType(static_cast<uint32_t>(fdit->second->compType)));
          if (!decompressor)
          {
            // Use default?
            decompressor.reset(new compress::CompressInterfaceSnappy());
          }

          int dcrc = decompressor->uncompressBlock(
              &alignedbuff[0], fdit->second->ptrList[cmpOffFact.quot].second, uCmpBuf, blen);

          if (dcrc != 0)
          {
#ifdef IDB_COMP_POC_DEBUG
            boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
#endif

            if (++decompRetryCount < 30)
            {
              blocksRead -= blocksThisRead;
              waitForRetry(decompRetryCount);

              // log an info message every 10 retries
              if (decompRetryCount == 1)
              {
                Message::Args args;
                args.add(oid);
                ostringstream infoMsg;
                iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
                infoMsg << "decompress retry for " << fileNamePtr << " decompRetry chunk " << cmpOffFact.quot
                        << " code=" << dcrc;
                args.add(infoMsg.str());
                primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
              }

              goto decompRetry;
            }

            cout << boldStart << "decomp returned " << dcrc << boldStop << endl;

            errorOccurred = true;
            Message::Args args;
            args.add(oid);
            errMsg << "Error decompressing block " << cmpOffFact.quot << " code=" << dcrc
                   << " part=" << partNum << " seg=" << segNum;
            args.add(errMsg.str());
            primitiveprocessor::mlp->logMessage(logging::M0061, args, true);
            iom->handleBlockReadError(fr, errMsg.str(), &copyLocked);
            break;
          }

          // FIXME: why doesn't this work??? (See later for why)
          // ptr = &uCmpBuf[cmpOffFact.rem];
          memcpy(ptr, &uCmpBuf[cmpOffFact.rem], blocksThisRead * BLOCK_SIZE);

          // log the retries, if any
          if (retryReadHeadersCount > 0 || decompRetryCount > 0)
          {
            Message::Args args;
            args.add(oid);
            ostringstream infoMsg;
            iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
            infoMsg << "Successfully uncompress " << fileNamePtr << " chunk " << cmpOffFact.quot << " @";

            if (retryReadHeadersCount > 0)
              infoMsg << " HeaderRetry:" << retryReadHeadersCount;

            if (decompRetryCount > 0)
              infoMsg << " UncompressRetry:" << decompRetryCount;

            args.add(infoMsg.str());
            primitiveprocessor::mlp->logInfoMessage(logging::M0006, args);
          }
        }

        for (i = 0; useCache && (uint32_t)i < lbids.size(); i++)
        {
          if (!isLocked[i])
          {
#ifdef IDB_COMP_POC_DEBUG
            {
              if (debugWrite)
              {
                boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
                cout << boldStart << "i = " << i << ", ptr = 0x" << hex << (ptrdiff_t)&ptr[i * BLOCK_SIZE]
                     << dec << boldStop << endl;
                cout << boldStart;
#if 0
                                int32_t* i32p;
                                i32p = (int32_t*)&ptr[i * BLOCK_SIZE];

                                for (int iy = 0; iy < 2; iy++)
                                {
                                    for (int ix = 0; ix < 8; ix++, i32p++)
                                        cout << *i32p << ' ';

                                    cout << endl;
                                }

#else
                int64_t* i64p;
                i64p = (int64_t*)&ptr[i * BLOCK_SIZE];

                for (int iy = 0; iy < 2; iy++)
                {
                  for (int ix = 0; ix < 8; ix++, i64p++)
                    cout << *i64p << ' ';

                  cout << endl;
                }

#endif
                cout << boldStop << endl;
              }
            }
#endif
            cacheInsertOps.push_back(
                CacheInsert_t(lbids[i], versions[i], (uint8_t*)&alignedbuff[i * BLOCK_SIZE]));
          }
        }

        if (useCache)
        {
          blocksLoaded += fbm->bulkInsert(cacheInsertOps);
          cacheInsertOps.clear();
        }
      }

      dlen -= blocksThisRead;

    }  // for (j...

    fdMapMutex.lock();

    if (fdit->second.get())
      fdit->second->inUse--;

    fdit = fdcache.end();
    fdMapMutex.unlock();

    if (errorOccurred)
      continue;

    try
    {
      iom->dbrm()->releaseLBIDRange(lbid, blocksRequested);
      copyLocked = false;
    }
    catch (exception& e)
    {
      cout << "releaseRange: " << e.what() << endl;
    }

    fr->BlocksRead(blocksRead);
    fr->BlocksLoaded(blocksLoaded);

    // FIXME: This is why some code above doesn't work...
    if (fr->data != 0 && blocksRequested == 1)
      memcpy(fr->data, alignedbuff, BLOCK_SIZE);

    fr->frMutex().lock();
    fr->SetPredicate(fileRequest::COMPLETE);
    fr->frCond().notify_one();
    fr->frMutex().unlock();

    if (iom->IOTrace())
    {
      clock_gettime(CLOCK_REALTIME, &rqst2);
      timespec_sub(tm, tm2, tm3);
      timespec_sub(rqst1, rqst2, rqst3);

      // @Bug 3149.  IOMTrace was not reporting bytesRead correctly for compressed columns.
      uint32_t reportBytesRead = (compressedBytesRead > 0) ? compressedBytesRead : bytesRead;

      lFile << left << setw(5) << setfill(' ') << oid << right << setw(5) << setfill(' ')
            << offset / extentSize << " " << right << setw(11) << setfill(' ') << lbid << " " << right
            << setw(9) << bytesRead / (readCount << 13) << " " << right << setw(9) << blocksRequested << " "
            << right << setw(10) << fixed << tm3 << " " << right << fixed
            << ((double)(rqst1.tv_sec + (1.e-9 * rqst1.tv_nsec))) << " " << right << setw(10) << fixed
            << rqst3 << " " << right << setw(10) << fixed << longSeekOffset << " " << right << setw(10)
            << fixed << reportBytesRead << " " << right << setw(3) << fixed << dbroot << " " << right
            << setw(3) << fixed << partNum << " " << right << setw(3) << fixed << segNum << " " << endl;
    }

  }  // for(;;)

  delete[] uCmpBuf;

  lFile.close();

  // reaching here is an error...

  return 0;
}  // end thr_popper

}  // anonymous namespace

namespace dbbc
{
void setReadLock()
{
  localLock.read_lock();
}

void releaseReadLock()
{
  localLock.read_unlock();
}

void dropFDCache()
{
  localLock.write_lock();
  fdcache.clear();
  localLock.write_unlock();
}
void purgeFDCache(std::vector<BRM::FileInfo>& files)
{
  localLock.write_lock();

  FdCacheType_t::iterator fdit;

  for (uint32_t i = 0; i < files.size(); i++)
  {
    FdEntry fdKey(files[i].oid, files[i].dbRoot, files[i].partitionNum, files[i].segmentNum,
                  files[i].compType, NULL);
    fdit = fdcache.find(fdKey);

    if (fdit != fdcache.end())
      fdcache.erase(fdit);
  }

  localLock.write_unlock();
}

ioManager::ioManager(FileBufferMgr& fbm, fileBlockRequestQueue& fbrq, int thrCount, int bsPerRead)
 : blocksPerRead(bsPerRead), fIOMfbMgr(fbm), fIOMRequestQueue(fbrq), fFileOp(false)
{
  if (thrCount <= 0)
    thrCount = 1;

  if (thrCount > 256)
    thrCount = 256;

  fConfig = Config::makeConfig();
  string val = fConfig->getConfig("DBBC", "IOMTracing");
  int temp = 0;

  if (val.length() > 0)
    temp = static_cast<int>(Config::fromText(val));

  if (temp > 0)
    fIOTrace = true;
  else
    fIOTrace = false;

  val = fConfig->getConfig("DBBC", "MaxOpenFiles");
  temp = 0;
  fMaxOpenFiles = MAX_OPEN_FILES;

  if (val.length() > 0)
    temp = static_cast<int>(Config::fromText(val));

  if (temp > 0)
    fMaxOpenFiles = temp;

  val = fConfig->getConfig("DBBC", "DecreaseOpenFilesCount");
  temp = 0;
  fDecreaseOpenFilesCount = DECREASE_OPEN_FILES;

  if (val.length() > 0)
    temp = static_cast<int>(Config::fromText(val));

  if (temp > 0)
    fDecreaseOpenFilesCount = temp;

  // limit the number of files closed
  if (fDecreaseOpenFilesCount > (uint32_t)(0.75 * fMaxOpenFiles))
    fDecreaseOpenFilesCount = (uint32_t)(0.75 * fMaxOpenFiles);

  val = fConfig->getConfig("DBBC", "FDCacheTrace");
  temp = 0;
  fFDCacheTrace = false;

  if (val.length() > 0)
    temp = static_cast<int>(Config::fromText(val));

  if (temp > 0)
  {
    fFDCacheTrace = true;
    FDTraceFile().open(string(MCSLOGDIR) + "/trace/fdcache", ios_base::ate | ios_base::app);
  }

  fThreadCount = thrCount;
  go();
}

void ioManager::buildOidFileName(const BRM::OID_t oid, uint16_t dbRoot, const uint32_t partNum,
                                 const uint16_t segNum, char* file_name)
{
  // when it's a request for the version buffer, the dbroot comes in as 0 for legacy reasons
  if (dbRoot == 0 && oid < 1000)
    dbRoot = fdbrm.getDBRootOfVBOID(oid);

  fFileOp.getFileNameForPrimProc(oid, file_name, dbRoot, partNum, segNum);
}

int ioManager::localLbidLookup(BRM::LBID_t lbid, BRM::VER_t verid, bool vbFlag, BRM::OID_t& oid,
                               uint16_t& dbRoot, uint32_t& partitionNum, uint16_t& segmentNum,
                               uint32_t& fileBlockOffset)
{
  if (primitiveprocessor::noVB > 0)
    vbFlag = false;

  int rc = fdbrm.lookupLocal(lbid, verid, vbFlag, oid, dbRoot, partitionNum, segmentNum, fileBlockOffset);

  return rc;
}

struct LambdaKludge
{
  LambdaKludge(ioManager* i) : iom(i)
  {
  }
  ~LambdaKludge()
  {
    iom = NULL;
  }
  ioManager* iom;
  void operator()()
  {
    thr_popper(iom);
  }
};

void ioManager::createReaders()
{
  int idx;

  for (idx = 0; idx < fThreadCount; idx++)
  {
    try
    {
      fThreadArr.create_thread(LambdaKludge(this));
    }
    catch (exception& e)
    {
      cerr << "IOM::createReaders() caught " << e.what() << endl;
      idx--;
      sleep(1);
      continue;
    }
  }
}

ioManager::~ioManager()
{
  stop();
}

void ioManager::go(void)
{
  createReaders();
}

// FIXME: is this right? what does this method do?
void ioManager::stop()
{
  for (int idx = 0; idx < fThreadCount; idx++)
  {
    (void)0;  // pthread_detach(fThreadArr[idx]);
  }
}

fileRequest* ioManager::getNextRequest()
{
  fileRequest* blk = 0;

  try
  {
    blk = fIOMRequestQueue.pop();
    return blk;
  }
  catch (exception&)
  {
    cerr << "ioManager::getNextRequest() ERROR " << endl;
  }

  return blk;
}

//------------------------------------------------------------------------------
// Prints stderr msg and updates fileRequest object to reflect an error.
// Lastly, notifies waiting thread that fileRequest has been completed.
//------------------------------------------------------------------------------
void ioManager::handleBlockReadError(fileRequest* fr, const string& errMsg, bool* copyLocked, int errorCode)
{
  try
  {
    dbrm()->releaseLBIDRange(fr->Lbid(), fr->BlocksRequested());
    *copyLocked = false;
  }
  catch (exception& e)
  {
    cout << "releaseRange on read error: " << e.what() << endl;
  }

  cerr << errMsg << endl;
  fr->RequestStatus(errorCode);
  fr->RequestStatusStr(errMsg);

  fr->frMutex().lock();
  fr->SetPredicate(fileRequest::COMPLETE);
  fr->frCond().notify_one();
  fr->frMutex().unlock();
}

}  // namespace dbbc
